[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました
1 はじめに
IoT事業部の平内(SIN)です。
AWS IoT Greengrass V2では、AWSから提供されるいくつかの事前構築済みコンポーネントがありますが、その中のストリームマネージャー (aws.greengrass.StreamManager) を使用すると、ストリームのための共通インタフェースが利用可能になります。
ストリームマネージャーでは、次の送信先に対応しています。
- AWS IoT Analytics
- Amazon Kinesis Data Streams
- AWS IoT SiteWise
- Amazon S3
今回は、上記のうち、Kienesis Data Streamsへのデータ送信を試してみました。
2 構成
試したみた構成は以下のような感じです。
カスタムのコンポネントでは、ストリームマネージャーSDKを使用してデータをストリームに送ります。 ストリームの送信先は、Kinesis Data Streams(stream-sample)に設定されており、同ストリームをトリガとしたLambdaを設置し、そのログでデータの到着を確認しています。
3 Kinesis Data Streams
シャード1で、ストリーム(stream-sample)を作成しています。
4 レシピ
ストリームマネージャーを使用する場合、依存関係で aws.greengrass.StreamManager を設定する必要があります。
また、ストリームマネージャ SDKを必要とするために、Artifactsで追加しています。
参考:ストリームマネージャーを使用するコンポーネントレシピの定義
--- RecipeFormatVersion: "2020-01-25" ComponentName: "com.example.KinesisSample" ComponentVersion: "1.0.9" ComponentType: "aws.greengrass.generic" ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: Install: pip3 install --user -r {artifacts:path}/requirements.txt Run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 -u {artifacts:path}/kinesis_sample.py Artifacts: - URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/kinesis_sample.py - URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/requirements.txt
5 コード
コンポーネントのコードは、以下のとおりです。
kinesis_sample.py
import datetime import time from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) stream_name = "MyStream" kinesis_stream_name = "stream-sample" client = StreamManagerClient() # ストリームが存在する場合は、一旦削除する try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass # ストリーム作成(Kinesis Data Streams) exports = ExportDefinition( kinesis=[KinesisConfig( identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする )] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition = exports ) ) while(True): # 1秒間隔でタイムスタンプを送る timestamp = (datetime.datetime.now()).strftime('%Y/%m/%d %H:%M:%S') record = "timestamp: {}".format(timestamp) result = client.append_message(stream_name, record.encode("utf-8")) print(result) time.sleep(1) client.close()
ExportDefinition() で、ストリームの送信先としてKinesisを指定していますが、ここで指定するKinesisConfigが、Kinesisのストリームに対する細部の設定となります。
(一部抜粋)
- kinesis_stream_name: Kinesis Data Stremasのストリーム名
- batch_size: 蓄積サイズ(1〜500 デフォルト:500)
- batch_interval_millis: 蓄積時間(60000〜9223372036854)
- priority: アップロードの優先順位
batch_sizeと、batch_interval_millisの両方が設定されている場合、とちらかが条件にヒットした時、アップロードされます。デフォルトでは、データ500件毎にアップロードされますが、上記サンプルでは、動作確認がしやすいように、batch_size = 10としています。
6 ポリシー
IoT Greengrassが、Kinesis Data Stremasのストリームに書き込むためには、PutRecordsが必要です。
作成したポリシーです。
KinesisStreamPolicy
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample" ] } ] }
そして、上記ポリシーをGreengrassV2TokenExchangeRoleに追加しています。
7 動作確認
動作確認のためにストリームをトリガとしたLambdaです。
import json import base64 def lambda_handler(event, context): if("Records" in event): records = event["Records"] print("records len:{}".format(len(records))) for i, record in enumerate(records): decoded_data = base64.b64decode(record["kinesis"]["data"]) print(decoded_data)
Lambdaのログから1秒毎のレコードが、10件溜まる毎に送られてきている様子を確認できます。
8 最後に
ストリームマネージャーを使用して、Kienesis Data Streamsへのデータ送信を試してみました。
単純にKinesisにデータを送りたいだけであれば、トークンマネージャーなどで権限を付与して、直接送ってもいいのですが、ストリームマネージャーを使用することが、以下のような効果が期待できます。
- 他のストリーム(S3や、AWS IoT Analyticsなど)との共通インターフェース
- ストレージタイプ、サイズ、データ保持に関するポリシーを定義できる
- エクスポート先、優先度、永続性などが定義できる
- ネットワーク切断時のリカバリなどの考慮が必要なくなる(ファイルにバッファリングされている)
言わば、「ストリームに関するマネージドサービスが、エッジ側で軽易に利用可能になる」と言えると思います。
9 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログをCloudWatch Logsに送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントからDynamoDBにアクセスしてみました